spring boot 集成 Hadoop3.2.0 + HBase2.3.0+ Spark3.0.0 您所在的位置:网站首页 springboot整合spark yarn spring boot 集成 Hadoop3.2.0 + HBase2.3.0+ Spark3.0.0

spring boot 集成 Hadoop3.2.0 + HBase2.3.0+ Spark3.0.0

2023-06-11 11:53| 来源: 网络整理| 查看: 265

引入包:

org.apache.hadoop hadoop-client ${hadoop.version} org.slf4j slf4j-log4j12 javax.servlet servlet-api org.apache.hadoop hadoop-common ${hadoop.version} org.slf4j slf4j-log4j12 javax.servlet servlet-api org.apache.hadoop hadoop-hdfs ${hadoop.version} org.slf4j slf4j-log4j12 javax.servlet servlet-api org.apache.hadoop hadoop-mapreduce-client-core ${hadoop.version} org.slf4j slf4j-log4j12 log4j log4j org.apache.hbase hbase-client ${hbase.version} org.slf4j slf4j-log4j12 log4j log4j org.apache.hbase hbase-server ${hbase.version} org.slf4j slf4j-log4j12 log4j log4j org.apache.hbase hbase-common ${hbase.version} org.slf4j slf4j-log4j12 log4j log4j org.apache.hbase hbase-mapreduce ${hbase.version} org.slf4j slf4j-log4j12 log4j log4j org.apache.hbase hbase-annotations ${hbase.version} org.apache.spark spark-core_2.12 ${spark.version} org.slf4j slf4j-log4j12 org.apache.logging.log4j log4j-api ch.qos.logback logback-classic org.apache.spark spark-sql_2.12 ${spark.version} org.slf4j slf4j-log4j12 org.apache.logging.log4j log4j-api org.apache.logging.log4j log4j-to-slf4j ch.qos.logback logback-classic org.apache.spark spark-streaming_2.12 ${spark.version}

 

新建一个 HBaseBean 类

public class HBaseBean { public HBaseBean() { } /** * hbase中的rowKey */ private String rowKey; /** * hbase中的列族 */ private String columnFamily; /** * hbase 列字段名 */ private String columnQualifier; /** * 时间戳 */ private Long timeStamp; /** * 类型 */ private String type; /** * 值 */ private String value; public String getRowKey() { return rowKey; } public void setRowKey(String rowKey) { this.rowKey = rowKey; } public String getColumnFamily() { return columnFamily; } public void setColumnFamily(String columnFamily) { this.columnFamily = columnFamily; } public String getColumnQualifier() { return columnQualifier; } public void setColumnQualifier(String columnQualifier) { this.columnQualifier = columnQualifier; } public Long getTimeStamp() { return timeStamp; } public void setTimeStamp(Long timeStamp) { this.timeStamp = timeStamp; } public String getType() { return type; } public void setType(String type) { this.type = type; } public String getValue() { return value; } public void setValue(String value) { this.value = value; } }

新建一个  HBaseConfig 类

/** * HBase配置类 * HBaseConfiguration.create() 会去CLASSPATH,下找hbase-site.xml * @author child * @date 2020-7-14 12:11:18 * https://hbase.apache.org/book.html#faq 官网的 * http://c.biancheng.net/view/6523.html hbase用法参考 * */ @Configuration public class HBaseConfig { @Bean public HbaseTemplate hbaseTemplate() { Connection connection = null; try { org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); connection = ConnectionFactory.createConnection(conf); } catch (IOException e) { e.printStackTrace(); } return new HbaseTemplate(connection); } @Bean public Admin admin() { Admin admin = null; try { org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); Connection connection = ConnectionFactory.createConnection(conf); admin = connection.getAdmin(); } catch (IOException e) { e.printStackTrace(); } return admin; } }

新建一个  HbaseTemplate 封装一些常用的方法

import com.culturalCenter.dataCenter.Utils.HBaseUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; import scala.Serializable; import javax.annotation.PostConstruct; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; /** * 对hbase 的 DDL、DML操作或者使用HBaseUtils * * @Author wulincheng * @Date 2020-7-13 15:14:08 * @Version 1.0 */ public class HbaseTemplate{ private Logger log = LoggerFactory.getLogger(this.getClass()); /** * hbase连接对象 */ private Connection connection; public HbaseTemplate() { } public HbaseTemplate(Connection connection) { setConnection(connection); } public Connection getConnection() { return connection; } private Admin getAdmin() throws IOException { return connection.getAdmin(); } public void setConnection(Connection connection) { this.connection = connection; } /** * 获取 {@link Table} * * @param tableName 表名称 * @return * @throws IOException */ public Table getTable(String tableName) throws IOException { return connection.getTable(TableName.valueOf(tableName)); } /** * 创建命名空间 * * @param nameSpace 命名空间名称 */ public void createNameSpace(String nameSpace) { Assert.hasLength(nameSpace, "命名空间不能为空"); NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(nameSpace).build(); try (Admin admin = getAdmin()) { admin.createNamespace(namespaceDescriptor); } catch (IOException e) { log.error("创建命名空间 [{}] 失败", nameSpace, e); } } /** * 删除命名空间 * * @param nameSpace 命名空间名称 */ public void deleteNameSpace(String nameSpace) { Assert.hasLength(nameSpace, "命名空间不能为空"); try (Admin admin = getAdmin()) { admin.deleteNamespace(nameSpace); } catch (IOException e) { log.error("删除命名空间 [{}] 失败", nameSpace, e); } } /** * 创建表 * * @param tableName 表名称 * @param CF 表中的列族 * @return */ public void createTable(String tableName, String... CF) { Assert.hasLength(tableName, "表名不能为空"); Assert.notEmpty(CF, "列族不能为空"); TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)); List columnFamilyDescriptors = new ArrayList(); for (String columnFamily : CF) { ColumnFamilyDescriptor build = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily)).build(); columnFamilyDescriptors.add(build); } TableDescriptor tableDescriptor = tableDescriptorBuilder.setColumnFamilies(columnFamilyDescriptors).build(); try (Admin admin = getAdmin()) { admin.createTable(tableDescriptor); } catch (IOException e) { log.error("创建 table => {} 失败", tableName, e); } } /** * 禁用表 * * @param tableName 表名称 * @return */ public void disableTable(String tableName) { Assert.hasLength(tableName, "表名不能为空"); try (Admin admin = getAdmin()) { admin.disableTable(TableName.valueOf(tableName)); } catch (IOException e) { log.error("禁用 table => {} 失败", tableName, e); } } /** * 删除表 * * @param tableName 表名称 * @return */ public void deleteTable(String tableName) { Assert.hasLength(tableName, "表名不能为空"); try (Admin admin = getAdmin()) { //禁用表之后才能删除表 disableTable(tableName); //删除表 admin.deleteTable(TableName.valueOf(tableName)); } catch (IOException e) { log.error("删除 table => {} 失败", tableName, e); } } /** * 列出指定命名空间下的所有表 * * @param nameSpace 命名空间名称 * @return {@link TableName} */ public List listTable(String nameSpace) { Assert.hasLength(nameSpace, "命名空间不能为空"); try (Admin admin = getAdmin()) { TableName[] tableNames = admin.listTableNamesByNamespace(nameSpace); List tableNameList = (List) CollectionUtils.arrayToList(tableNames); return tableNameList; } catch (IOException e) { log.error("获取命名空间 [{}] 下的所有表失败", nameSpace, e); } return null; } /** * 列出 default 命名空间下的所有表 * * @return {@link TableName} */ public List listTableByDefault() { return listTable("default"); } /** * 扫描表 * * @param tableName 表名 * @return */ public List scanTable(String tableName) { Assert.hasLength(tableName, "表名不能为空"); Scan scan = new Scan(); return getResult(tableName, scan); } /** * 插入数据 * @param tableName 表名 * @param rowKey rowKey * @param columnFamily 列族 * @param columns 列 * @param values 值 * @return true/false */ public boolean putData(String tableName, String rowKey, String columnFamily, List columns, List values) { try { Table table = getAdmin().getConnection().getTable(TableName.valueOf(tableName)); Put put = new Put(Bytes.toBytes(rowKey)); for (int i=0; i ( ,10] (10,20] (20,30] (30, ) * @param keys 分区集合[10, 20, 30] * @return byte二维数组 */ private byte[][] getSplitKeys(List keys) { byte[][] splitKeys = new byte[keys.size()][]; TreeSet rows = new TreeSet(Bytes.BYTES_COMPARATOR); for(String key : keys) { rows.add(Bytes.toBytes(key)); } int i = 0; for (byte[] row : rows) { splitKeys[i] = row; i ++; } return splitKeys; } public static HBaseBean toHBaseBean(Result rs){ List cells = rs.listCells(); HBaseBean hBaseBean=new HBaseBean(); cells.forEach(cell -> { hBaseBean.setRowKey(Bytes.toString(CellUtil.cloneRow(cell))); hBaseBean.setColumnFamily(Bytes.toString(CellUtil.cloneFamily(cell))); hBaseBean.setColumnQualifier(Bytes.toString(CellUtil.cloneQualifier(cell))); hBaseBean.setTimeStamp(cell.getTimestamp()); hBaseBean.setType(cell.getType().toString()); hBaseBean.setValue(Bytes.toString(CellUtil.cloneValue(cell))); }); return hBaseBean; } public static List toHBaseBeans(Result rs){ List hBaseBeans=new ArrayList(); List cells = rs.listCells(); cells.forEach(cell -> { HBaseBean hBaseBean=new HBaseBean(); hBaseBean.setRowKey(Bytes.toString(CellUtil.cloneRow(cell))); hBaseBean.setColumnFamily(Bytes.toString(CellUtil.cloneFamily(cell))); hBaseBean.setColumnQualifier(Bytes.toString(CellUtil.cloneQualifier(cell))); hBaseBean.setTimeStamp(cell.getTimestamp()); hBaseBean.setType(cell.getType().toString()); hBaseBean.setValue(Bytes.toString(CellUtil.cloneValue(cell))); hBaseBeans.add(hBaseBean); }); return hBaseBeans; } public static List toHBaseBeans(Iterator resultIterator){ List hBaseBeans=new ArrayList(); while (resultIterator.hasNext()){ Result rs = resultIterator.next(); List cells = rs.listCells(); cells.forEach(cell -> { HBaseBean hBaseBean=new HBaseBean(); hBaseBean.setRowKey(Bytes.toString(CellUtil.cloneRow(cell))); hBaseBean.setColumnFamily(Bytes.toString(CellUtil.cloneFamily(cell))); hBaseBean.setColumnQualifier(Bytes.toString(CellUtil.cloneQualifier(cell))); hBaseBean.setTimeStamp(cell.getTimestamp()); hBaseBean.setType(cell.getType().toString()); hBaseBean.setValue(Bytes.toString(CellUtil.cloneValue(cell))); hBaseBeans.add(hBaseBean); }); } return hBaseBeans; } /** * 获取单行结果 * @return */ public static Map getRowData(Result rs) { Map column = new HashMap(); column.put("rowKey", Bytes.toString(rs.getRow())); List cells = rs.listCells(); for (Cell cell : cells) { String columnName = Bytes.toString(CellUtil.cloneQualifier(cell)); String columnValue = Bytes.toString(CellUtil.cloneValue(cell)); column.put(columnName, columnValue); } return column; } public static void setStartAndStop(String startRow, String stopRow, Scan scan) { if (!StringUtils.isEmpty(startRow)) { scan.withStartRow(Bytes.toBytes(startRow)); } if (!StringUtils.isEmpty(stopRow)) { scan.withStopRow(Bytes.toBytes(stopRow)); } } }

新建一个类配置spark 

SparkContextBean import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import scala.Serializable; @Configuration @ConfigurationProperties(prefix="spark") public class SparkContextBean implements Serializable { //spark的安装地址 private String sparkHome = ""; //应用的名称 private String appName = ""; //master的地址 private String master = ""; @Bean @ConditionalOnMissingBean(SparkConf.class) public SparkConf sparkConf() throws Exception { SparkConf conf = new SparkConf() .setSparkHome(sparkHome) .setAppName(appName) .setMaster(master); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); return conf; } @Bean @ConditionalOnMissingBean(JavaSparkContext.class) public JavaSparkContext javaSparkContext() throws Exception { return new JavaSparkContext(sparkConf()); } public String getSparkHome() { return sparkHome; } public void setSparkHome(String sparkHome) { this.sparkHome = sparkHome; } public String getAppName() { return appName; } public void setAppName(String appName) { this.appName = appName; } public String getMaster() { return master; } public void setMaster(String master) { this.master = master; } }

 

然后把服务上配置好的hbase-site.xml文件复制到 resources 根目录下就可以了

 



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有